package org.budo.warehouse.dao.impl;

import java.io.Serializable;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.budo.mybatis.dao.MybatisDao;
import org.budo.support.dao.page.Page;
import org.budo.support.dao.page.PageModel;
import org.budo.support.exception.business.BusinessException;
import org.budo.support.lang.util.ArrayUtil;
import org.budo.support.lang.util.MapUtil;
import org.budo.warehouse.dao.api.IEntryBufferDao;
import org.budo.warehouse.service.entity.EntryBuffer;
import org.springframework.stereotype.Repository;

/**
 * @author limingwei
 */
@Repository
public class EntryBufferDaoImpl implements IEntryBufferDao {
    @Resource
    private MybatisDao mybatisDao;

    @Override
    public Integer countNotFlushedByPipelineId(Integer pipelineId) {
        String sql = " SELECT COUNT(1) FROM t_entry_buffer WHERE pipeline_id=#{pipelineId} AND ( flushed_at IS NULL OR flushed_at = '') ";
        Map<String, Object> parameter = MapUtil.stringObjectMap("pipelineId", pipelineId);
        return mybatisDao.findBySql(Integer.class, sql, parameter);
    }

    @Override
    public List<EntryBuffer> listNotFlushedByPipelineId(Integer pipelineId, Page page) {
        String sql = " SELECT * FROM t_entry_buffer WHERE pipeline_id=#{pipelineId} AND ( flushed_at IS NULL OR flushed_at = '') ";
        Map<String, Object> parameter = MapUtil.stringObjectMap("pipelineId", pipelineId);
        return mybatisDao.listBySql(EntryBuffer.class, sql, parameter, page);
    }

    @Override
    public Timestamp findNotFlushedMaxCreatedAtByPipelineId(Integer pipelineId) {
        String sql = " SELECT created_at FROM t_entry_buffer WHERE pipeline_id=#{pipelineId} ORDER BY created_at DESC LIMIT 1";
        Map<String, Object> parameter = MapUtil.stringObjectMap("pipelineId", pipelineId);
        return mybatisDao.findBySql(Timestamp.class, sql, parameter);
    }

    @Override
    public Integer updateFlushedAtByIds(Timestamp flushedAt, List<Integer> ids) {
        String sql = " UPDATE t_entry_buffer SET flushed_at=#{flushedAt} WHERE id IN (#{ids}) ";
        Map<String, Object> parameter = MapUtil.stringObjectMap("ids", ids, "flushedAt", flushedAt);
        return mybatisDao.updateBySql(sql, parameter);
    }

    @Override
    public Integer[] insertBatch(List<EntryBuffer> entryBuffers) {
        try {
            Serializable[] ids = mybatisDao.insertBatch(EntryBuffer.class, entryBuffers);
            return ArrayUtil.toIntegerArray(ids);
        } catch (Throwable e) {
            String error = "" + e;
            if (error.contains("SQLException: Incorrect string value")) {
                throw new BusinessException(error); // utf8-mb4 问题
            }

            throw new RuntimeException(e);
        }
    }

    @Override
    public Integer insert(EntryBuffer entryBuffer) {
        return (Integer) mybatisDao.insert(EntryBuffer.class, entryBuffer);
    }

    @Override
    public Timestamp findMaxCreatedAtByDataNodeId(Integer dataNodeId) {
        String sql = " SELECT created_at FROM t_entry_buffer " //
                + " WHERE pipeline_id IN ( SELECT id FROM t_pipeline WHERE source_datanode_id = #{dataNodeId} ) " //
                + " ORDER BY created_at DESC LIMIT 1";
        Map<String, Object> parameter = MapUtil.stringObjectMap("dataNodeId", dataNodeId);
        return mybatisDao.findBySql(Timestamp.class, sql, parameter);
    }

    @Override
    public Integer countByExample(EntryBuffer entryBuffer) {
        String sql = " SELECT COUNT(1) FROM t_entry_buffer WHERE rows=#{rows} ";
        Map<String, Object> parameter = MapUtil.stringObjectMap("rows", entryBuffer.getRows());
        return mybatisDao.findBySql(Integer.class, sql, parameter);
    }

    @Override
    public Integer deleteByFlushedAtLessThan(Timestamp flushedAt) {
        String sql = " DELETE FROM t_entry_buffer WHERE flushed_at IS NOT NULL AND flushed_at!='' AND flushed_at<#{flushedAt} ";
        Map<String, Object> parameter = MapUtil.stringObjectMap("flushedAt", flushedAt);
        return mybatisDao.deleteBySql(sql, parameter);
    }

    @Override
    public List<Integer> listIdByFlushedAtLessThan(Timestamp flushedAt) {
        String sql = " SELECT id FROM t_entry_buffer WHERE flushed_at IS NOT NULL AND flushed_at!='' AND flushed_at<#{flushedAt} ";
        Map<String, Object> parameter = MapUtil.stringObjectMap("flushedAt", flushedAt);
        return mybatisDao.listBySql(Integer.class, sql, parameter, Page.max());
    }

    @Override
    public Boolean deleteById(Integer id) {
        return mybatisDao.deleteById(EntryBuffer.class, id);
    }

    @Override
    public PageModel<EntryBuffer> list(Page page) {
        List<EntryBuffer> list = mybatisDao.list(EntryBuffer.class, page);
        return new PageModel<EntryBuffer>(list, page);
    }
}